<?php
require_once(Kohana::find_file('vendor','phprpc/phprpc_client'));

/**
 * 同步数据到远程数据库
 * Created by PhpStorm.
 * User: cai
 * Date: 16-2-6
 * Time: 下午2:44
 */
class Controller_Triggertable extends Controller
{
    /**
     * 获取产生触发操作的数据值
     * @return 相应数据表中的数据值
     */
    public function getdata($warehouse_id,$tablename = '')
    {
        $data = array();
        //Kohana::$log->add(Log::INFO, 'c1_'.date('Y-m-d H:i:s'));

        $warehouse_id = intval($warehouse_id);
        $where        = "warehouseid={$warehouse_id}";
        if(preg_match('/^[\w]+$/',$tablename)){
            $where .= " AND tablename='$tablename'";
        }
        $sql               = "SELECT id,warehouseid,tablename,act,tableid,counter,info,addtime
            FROM triggertable
            WHERE ".$where.'
            ORDER BY id asc
            LIMIT 1000';
        $triggertable_list = DB::query(Database::SELECT,$sql)
            ->execute('newerp')
            ->as_array();
        //Kohana::$log->add(Log::INFO, 'c2_'.date('Y-m-d H:i:s'));

        $update_ids   = array();
        $counter_flag = 0;
        foreach($triggertable_list as $key => $_value){
            //Kohana::$log->add(Log::INFO, 'c3_'.date('Y-m-d H:i:s'));

            if($_value['counter']>0){
                $counter_flag = 1;
            }
            if($counter_flag){
                if($_value['counter']==0){
                    //有待处理的数据，就将counter=0未处理的数据跳过
                    continue;
                }
            }

            $update_ids[] = $_value['id'];

            if(($_value['counter']%10)!=0){
                //数据同步出问题，（RPC或对方数据库），暂停两分处理
                continue;
            }
            //Kohana::$log->add(Log::INFO, 'c4_'.date('Y-m-d H:i:s'));

            if($_value['act']=="delete"){
                $data[$_value['id']]["data"]["id"] = $_value['tableid'];
            }elseif($_value['act']=="add"){
                $sql                         = "SELECT *
                    FROM ".$_value['tablename']."
                    WHERE id= ".$_value['tableid']."
                    LIMIT 1";
                $data[$_value['id']]["data"] = DB::query(Database::SELECT,$sql)
                    ->execute('newerp')
                    ->current();
                //Kohana::$log->add(Log::INFO, 'c5_'.date('Y-m-d H:i:s'));

                if(!(is_array($data[$_value['id']]["data"])&&count($data[$_value['id']]["data"]))){
                    //临时log记录
                    Kohana::$log->add(Log::INFO,'Add error message follow');
                    Kohana::$log->add(Log::INFO,$sql);
                    Kohana::$log->add(Log::INFO,var_export($data[$_value['id']]["data"],TRUE));
                    Kohana::$log->add(Log::INFO,'Error message end');

                    $sql = "UPDATE triggertable SET counter=counter+1 WHERE id =".$_value['id'];
                    DB::query(Database::UPDATE,$sql)
                        ->execute('newerp');

                    return array();
                    //不明原因读不了数据
                    continue;
                }

                /*
                if(strpos($_value['tablename'],'usystem_')){
                    //SKU基础资料表数据修正
                }else{
                    $data[$_value['id']]["data"]["id"] = $_value['tableid'].substr(sprintf("%03s",$_value['warehouseid']),-3);
                }
                */
                $data[$_value['id']]["data"]["crontrigger"] = 1;
                unset($data[$_value['id']]["data"]["aid"]);
            }elseif($_value['act']=="update"){
                $sql                         = "SELECT *
                    FROM ".$_value['tablename']."
                    WHERE id = ".$_value['tableid']."
                    LIMIT 1";
                $data[$_value['id']]["data"] = DB::query(Database::SELECT,$sql)
                    ->execute('newerp')
                    ->current();
                //Kohana::$log->add(Log::INFO, 'c6_'.date('Y-m-d H:i:s'));

                if(!(is_array($data[$_value['id']]["data"])&&count($data[$_value['id']]["data"]))){
                    //临时log记录
                    Kohana::$log->add(Log::INFO,'Update error message follow');
                    Kohana::$log->add(Log::INFO,$sql);
                    Kohana::$log->add(Log::INFO,var_export($data[$_value['id']]["data"],TRUE));
                    Kohana::$log->add(Log::INFO,'Error message end');

                    $sql = "UPDATE triggertable SET counter=counter+1 WHERE id =".$_value['id'];
                    DB::query(Database::UPDATE,$sql)
                        ->execute('newerp');

                    return array();

                    //不明原因读不了数据
                    continue;
                }

                $data[$_value['id']]["data"]["crontrigger"] = 2;
                unset($data[$_value['id']]["data"]["aid"]);
            }
            //Kohana::$log->add(Log::INFO, 'c7_'.date('Y-m-d H:i:s'));

            $data[$_value['id']]["table"]       = $_value["tablename"];
            $data[$_value['id']]['warehouseid'] = $_value['warehouseid'];
            $data[$_value['id']]["act"]         = $_value["act"];
            $data[$_value['id']]["tableid"]     = $data[$_value['id']]["data"]['id'];
            $data[$_value['id']]['info']        = $_value['info'];          // 字段信息
            $data[$_value['id']]['addtime']     = $_value['addtime'];       // 操作时间
        }
        if(count($update_ids)){
            $sql = "UPDATE triggertable SET counter=counter+1 WHERE id in(".join($update_ids,',').")";
            DB::query(Database::UPDATE,$sql)
                ->execute('newerp');
        }
        //Kohana::$log->add(Log::INFO, 'c8_'.date('Y-m-d H:i:s'));

        return $data;
    }

    // 实时检测数据表，按仓库查询triggertable中存放的表名
    public function Action_rpcdata()
    {
        $start_time = time();
        $counter    = 0;
        //Kohana::$log->add(Log::INFO, 'a1_'.date('Y-m-d H:i:s'));

        while(1){
            $sql    = "SELECT warehouseid,tablename FROM triggertable GROUP BY warehouseid,tablename";
            $tables = DB::query(Database::SELECT,$sql)
                ->execute('newerp')
                ->as_array();
            // 如果没有记录，则说明无待处理数据
            if(!(is_array($tables)&&count($tables))){
                sleep(5);
                if(time()-$start_time>58){
                    break;
                }
                continue;
            }
            $row = array_rand($tables,1);

            // 按表处理同步
            $tablename   = $tables[$row]['tablename'];
            $warehouseid = $tables[$row]['warehouseid'];
            //Kohana::$log->add(Log::INFO, 'a2_'.date('Y-m-d H:i:s'));

            // 启动单表同步任务，传递$warehouseid和$tablename两个参数
            $this->rpcdata_single_table($warehouseid,$tablename);
            if(time()-$start_time>58){
                //Kohana::$log->add(Log::INFO, 'a3_'.date('Y-m-d H:i:s'));

                break;
            }
            //0.2second
            usleep(200000);
            //sleep(2);
            //Kohana::$log->add(Log::INFO, 'a4_'.date('Y-m-d H:i:s'));

        }
    }

    // 处理单表单批次数据的同步
    public function rpcdata_single_table($warehouse_id,$tablename)
    {
        $config            = Kohana::$config->load('triggerserver');
        $configenvironment = Kohana::$config->load('triggerenvironment');
        $environment_str   = "?environment=".$configenvironment->get('environment');

        //Kohana::$log->add(Log::INFO, 'b1_'.date('Y-m-d H:i:s'));

        $counter  = 0;
        $syn_data = $this->getdata($warehouse_id,$tablename);
        //Kohana::$log->add(Log::INFO, 'b2_'.date('Y-m-d H:i:s'));

        //临时log记录
        //Kohana::$log->add(Log::INFO, 'Rpc data follow');
        //Kohana::$log->add(Log::INFO, var_export($syn_data,TRUE));
        if(!(is_array($syn_data)&&count($syn_data))){
            //无数据的情况
            echo 'rpcdata is null! ['.$warehouse_id.'-->'.$tablename.':'.$counter.'] '.date('Y-m-d H:i:s')."\n";
            sleep(2);
            return $counter;
        }
        //Kohana::$log->add(Log::INFO, 'b3_'.date('Y-m-d H:i:s'));

        if(is_array($syn_data)&&count($syn_data)){
            $client = new  PHPRPC_Client($config->get('huizongapiserver').$environment_str);

            /*
            if($warehouse_id==2){
                $client = new  PHPRPC_Client($config->get('dgapiserver'));
            }elseif($warehouse_id==1){
                $client = new  PHPRPC_Client($config->get('hncapiserver'));
            }elseif($warehouse_id==887){
                $client = new  PHPRPC_Client($config->get('apiserver887'));
            }else{
                $client = new  PHPRPC_Client($config->get('selfapiserver'));
            }
            */
            //Kohana::$log->add(Log::INFO, 'b4_'.date('Y-m-d H:i:s'));

            $triggertable_id_arr = $client->rpcdata($syn_data);
            if(is_array($triggertable_id_arr)&&count($triggertable_id_arr)){
                DB::delete("triggertable")
                    ->where("id","in",$triggertable_id_arr)
                    ->execute("newerp");
            }else{
                print_r($triggertable_id_arr);
                echo "\n";
            }
            //Kohana::$log->add(Log::INFO, 'b1_'.date('Y-m-d H:i:s'));

            $counter += count($triggertable_id_arr);
        }
        //Kohana::$log->add(Log::INFO, 'b5_'.date('Y-m-d H:i:s'));

        echo 'rpcdata success! ['.$warehouse_id.'-->'.$tablename.':'.$counter.'] '.date('Y-m-d H:i:s')."\n";
        return $counter;
    }
    
}
